import java.util
import java.util.Properties

import kafka.consumer.{Consumer, ConsumerConfig}


object KafkaTest {
  def main(args: Array[String]): Unit = {

    var groupid = "test"
    var consumerid = "test"
    var topic = "test"

    val props = new Properties()
    props.put("zookeeper.connect", "localhost:2181")
    props.put("group.id", groupid)
    props.put("client.id", "spark")
    props.put("consumer.id", consumerid)
    props.put("auto.offset.reset", "smallest")
    props.put("auto.commit.enable", "true")
    props.put("auto.commit.interval.ms", "100")

    val consumerConfig = new ConsumerConfig(props)
    val consumer = Consumer.create(consumerConfig)

    val topicCountMap = Map(topic -> 1)
    val consumerMap = consumer.createMessageStreams(topicCountMap)
    val streams = consumerMap.get(topic).get
    for (stream <- streams) {
      val it = stream.iterator()

      while (it.hasNext()) {
        val messageAndMetadata = it.next()
        val message = s"Topic:${messageAndMetadata.topic}, GroupID:$groupid, Consumer ID:$consumerid, PartitionID:${messageAndMetadata.partition}, " +
          s" Message Payload: ${new String(messageAndMetadata.message())}"

        System.out.println(message);

      }

    }
  }
}
